/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.schemas;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.annotations.Experimental.Kind;

import com.bff.gaia.unified.sdk.schemas.annotations.DefaultSchema;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.util.common.ReflectHelpers;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TypeDescriptor;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Maps;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Sets;

import com.bff.gaia.unified.sdk.values.PCollection;



import javax.annotation.Nullable;

import java.util.*;

import java.util.function.Function;



/**

 * A {@link SchemaRegistry} allows registering {@link Schema}s for a given Java {@link Class} or a

 * {@link TypeDescriptor}.

 *

 * <p>Types registered in a pipeline's schema registry will automatically be discovered by any

 * {@link PCollection} that uses {@link SchemaCoder}. This allows users

 * to write pipelines in terms of their own Java types, yet still register schemas for these types.

 *

 * <p>TODO: Provide support for schemas registered via a ServiceLoader interface. This will allow

 * optional modules to register schemas as well.

 */

@Experimental(Kind.SCHEMAS)

public class SchemaRegistry {

  private static final List<SchemaProvider> REGISTERED_SCHEMA_PROVIDERS;



  private static class SchemaEntry<T> {

    private final Schema schema;

    private final SerializableFunction<T, Row> toRow;

    private final SerializableFunction<Row, T> fromRow;



    SchemaEntry(

        Schema schema, SerializableFunction<T, Row> toRow, SerializableFunction<Row, T> fromRow) {

      this.schema = schema;

      this.toRow = toRow;

      this.fromRow = fromRow;

    }

  }



  private final Map<TypeDescriptor, SchemaEntry> entries = Maps.newHashMap();

  private final ArrayDeque<SchemaProvider> providers;



  private static class PerTypeRegisteredProvider implements SchemaProvider {

    private final Map<TypeDescriptor, SchemaProvider> providers = Maps.newHashMap();



    void registerProvider(TypeDescriptor typeDescriptor, SchemaProvider schemaProvider) {

      providers.put(typeDescriptor, schemaProvider);

    }



    @Nullable

    @Override

    public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {

      TypeDescriptor<?> type = typeDescriptor;

      do {

        SchemaProvider schemaProvider = providers.get(type);

        if (schemaProvider != null) {

          return schemaProvider.schemaFor(type);

        }

        Class<?> superClass = type.getRawType().getSuperclass();

        if (superClass == null || superClass.equals(Object.class)) {

          return null;

        }

        type = TypeDescriptor.of(superClass);

      } while (true);

    }



    @Nullable

    @Override

    public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {

      TypeDescriptor<?> type = typeDescriptor;

      do {

        SchemaProvider schemaProvider = providers.get(type);

        if (schemaProvider != null) {

          return (SerializableFunction<T, Row>) schemaProvider.toRowFunction(type);

        }

        Class<?> superClass = type.getRawType().getSuperclass();

        if (superClass == null || superClass.equals(Object.class)) {

          return null;

        }

        type = TypeDescriptor.of(superClass);

      } while (true);

    }



    @Nullable

    @Override

    public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {

      TypeDescriptor<?> type = typeDescriptor;

      do {

        SchemaProvider schemaProvider = providers.get(type);

        if (schemaProvider != null) {

          return (SerializableFunction<Row, T>) schemaProvider.fromRowFunction(type);

        }

        Class<?> superClass = type.getRawType().getSuperclass();

        if (superClass == null || superClass.equals(Object.class)) {

          return null;

        }

        type = TypeDescriptor.of(superClass);

      } while (true);

    }

  }



  private final PerTypeRegisteredProvider perTypeRegisteredProviders =

      new PerTypeRegisteredProvider();



  private SchemaRegistry() {

    providers = new ArrayDeque<>(REGISTERED_SCHEMA_PROVIDERS);

    providers.addFirst(perTypeRegisteredProviders);

  }



  public static SchemaRegistry createDefault() {

    return new SchemaRegistry();

  }



  /** Register a schema for a specific {@link Class} type. */

  public <T> void registerSchemaForClass(

      Class<T> clazz,

      Schema schema,

      SerializableFunction<T, Row> toRow,

      SerializableFunction<Row, T> fromRow) {

    registerSchemaForType(TypeDescriptor.of(clazz), schema, toRow, fromRow);

  }



  /** Register a schema for a specific {@link TypeDescriptor} type. */

  public <T> void registerSchemaForType(

      TypeDescriptor<T> type,

      Schema schema,

      SerializableFunction<T, Row> toRow,

      SerializableFunction<Row, T> fromRow) {

    entries.put(type, new SchemaEntry<>(schema, toRow, fromRow));

  }



  /**

   * Register a {@link SchemaProvider}.

   *

   * <p>A {@link SchemaProvider} allows for deferred lookups of per-type schemas. This can be used

   * when schemas are registered in an external service. The SchemaProvider will lookup the type in

   * the external service and return the correct {@link Schema}.

   */

  public void registerSchemaProvider(SchemaProvider schemaProvider) {

    providers.addFirst(schemaProvider);

  }



  /** Register a {@link SchemaProvider} to be used for a specific type. * */

  public <T> void registerSchemaProvider(Class<T> clazz, SchemaProvider schemaProvider) {

    registerSchemaProvider(TypeDescriptor.of(clazz), schemaProvider);

  }



  /** Register a {@link SchemaProvider} to be used for a specific type. * */

  public <T> void registerSchemaProvider(

      TypeDescriptor<T> typeDescriptor, SchemaProvider schemaProvider) {

    perTypeRegisteredProviders.registerProvider(typeDescriptor, schemaProvider);

  }



  /**

   * Register a POJO type for automatic schema inference.

   *

   * <p>Currently schema field names will match field names in the POJO, and all fields must be

   * mutable (i.e. no final fields).

   */

  public <T> void registerPOJO(Class<T> clazz) {

    registerPOJO(TypeDescriptor.of(clazz));

  }



  /**

   * Register a POJO type for automatic schema inference.

   *

   * <p>Currently schema field names will match field names in the POJO, and all fields must be

   * mutable (i.e. no final fields). The Java object is expected to have implemented a correct

   * .equals() and .hashCode methods The equals method must be completely determined by the schema

   * fields. i.e. if the object has hidden fields that are not reflected in the schema but are

   * compared in equals, then results will be incorrect.

   */

  public <T> void registerPOJO(TypeDescriptor<T> typeDescriptor) {

    registerSchemaProvider(typeDescriptor, new JavaFieldSchema());

  }



  /**

   * Register a JavaBean type for automatic schema inference.

   *

   * <p>Currently schema field names will match getter names in the bean, and all getters must have

   * matching setters. The Java object is expected to have implemented a correct .equals() and

   * .hashCode methods The equals method must be completely determined by the schema fields. i.e. if

   * the object has hidden fields that are not reflected in the schema but are compared in equals,

   * then results will be incorrect.

   */

  public <T> void registerJavaBean(Class<T> clazz) {

    registerJavaBean(TypeDescriptor.of(clazz));

  }



  /**

   * Register a JavaBean type for automatic schema inference.

   *

   * <p>Currently schema field names will match getter names in the bean, and all getters must have

   * matching setters.

   */

  public <T> void registerJavaBean(TypeDescriptor<T> typeDescriptor) {

    registerSchemaProvider(typeDescriptor, new JavaBeanSchema());

  }



  /**

   * Get a schema for a given {@link Class} type. If no schema exists, throws {@link

   * NoSuchSchemaException}.

   */

  public <T> Schema getSchema(Class<T> clazz) throws NoSuchSchemaException {

    return getSchema(TypeDescriptor.of(clazz));

  }



  private <ReturnT> ReturnT getProviderResult(Function<SchemaProvider, ReturnT> f)

      throws NoSuchSchemaException {

    for (SchemaProvider provider : providers) {

      ReturnT result = f.apply(provider);

      if (result != null) {

        return result;

      }

    }

    throw new NoSuchSchemaException();

  }



  /**

   * Retrieve a schema for a given {@link TypeDescriptor} type. If no schema exists, throws {@link

   * NoSuchSchemaException}.

   */

  public <T> Schema getSchema(TypeDescriptor<T> typeDescriptor) throws NoSuchSchemaException {

    SchemaEntry entry = entries.get(typeDescriptor);

    if (entry != null) {

      return entry.schema;

    }

    return getProviderResult((SchemaProvider p) -> p.schemaFor(typeDescriptor));

  }



  /** Rerieve the function that converts an object of the specified type to a {@link Row} object. */

  public <T> SerializableFunction<T, Row> getToRowFunction(Class<T> clazz)

      throws NoSuchSchemaException {

    return getToRowFunction(TypeDescriptor.of(clazz));

  }



  /** Rerieve the function that converts an object of the specified type to a {@link Row} object. */

  public <T> SerializableFunction<T, Row> getToRowFunction(TypeDescriptor<T> typeDescriptor)

      throws NoSuchSchemaException {

    SchemaEntry entry = entries.get(typeDescriptor);

    if (entry != null) {

      return entry.toRow;

    }

    return getProviderResult((SchemaProvider p) -> p.toRowFunction(typeDescriptor));

  }



  /** Retrieve the function that converts a {@link Row} object to the specified type. */

  public <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz)

      throws NoSuchSchemaException {

    return getFromRowFunction(TypeDescriptor.of(clazz));

  }



  /** Retrieve the function that converts a {@link Row} object to the specified type. */

  public <T> SerializableFunction<Row, T> getFromRowFunction(TypeDescriptor<T> typeDescriptor)

      throws NoSuchSchemaException {

    SchemaEntry entry = entries.get(typeDescriptor);

    if (entry != null) {

      return entry.fromRow;

    }

    return getProviderResult((SchemaProvider p) -> p.fromRowFunction(typeDescriptor));

  }



  static {

    // find all statically-registered SchemaProviders.

    List<SchemaProvider> providersToRegister = Lists.newArrayList();

    Set<SchemaProviderRegistrar> registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);

    // Find all SchemaProviderRegistrar classes that are registered as service loaders

    // (usually using the @AutoService annotation).

    registrars.addAll(

        Lists.newArrayList(

            ServiceLoader.load(SchemaProviderRegistrar.class, ReflectHelpers.findClassLoader())));

    // Load all SchemaProviders that are registered using the @DefaultSchema annotation.

    providersToRegister.addAll(

        new DefaultSchema.DefaultSchemaProviderRegistrar().getSchemaProviders());

    for (SchemaProviderRegistrar registrar : registrars) {

      providersToRegister.addAll(registrar.getSchemaProviders());

    }

    REGISTERED_SCHEMA_PROVIDERS = ImmutableList.copyOf(providersToRegister);

  }

}